# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Abstract class providing a common interface to all
discrete operators working on the Host backend.
* :class:`~hysop.backend.host.host_operator.HostOperator` is an abstract class
used to provide a common interface to all discrete operators working with the
opencl backend.
"""
from abc import ABCMeta
from contextlib import contextmanager
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None
from hysop.constants import ComputeGranularity, Backend
from hysop.core.graph.computational_operator import ComputationalGraphOperator
from hysop.topology.topology_descriptor import TopologyDescriptor
[docs]
class HostOperatorBase(ComputationalGraphOperator, metaclass=ABCMeta):
"""
Abstract class for discrete operators working on cpu.
HostOperatorBase ignore the extra cl_env keyword parameter.
"""
@debug
def __new__(cls, cl_env=None, **kwds):
return super().__new__(cls, **kwds)
@debug
def __init__(self, cl_env=None, **kwds):
"""
Create the common attributes of all host operators.
All input and output variable topologies should be of kind
Backend.HOST and share the same HostEnvironment.
"""
super().__init__(**kwds)
[docs]
class HostOperator(HostOperatorBase, metaclass=ABCMeta):
"""
Abstract class for discrete operators working on cpu.
HostOperator extra cl_env keyword parameter and enforces HOST backend.
"""
[docs]
@classmethod
def supported_backends(cls):
"""
Return the backends that this operator's topologies can support.
"""
return {Backend.HOST}
[docs]
class OpenClMappedMemoryObjectGetter:
def __new__(cls, obj, evt, **kwds):
return super().__new__(cls, **kwds)
def __init__(self, obj, evt, **kwds):
super().__init__(**kwds)
check_instance(obj, OpenClMappable)
self.__obj = obj
self.__evt = evt
def __getitem__(self, key):
return self.__obj.get_mapped_object(key=key)
@property
def evt(self):
return self.__evt
[docs]
class OpenClMappable:
"""
Extend host operator capabilities to work on mapped opencl buffers
"""
[docs]
@classmethod
def supported_backends(cls):
sb = super().supported_backends()
sb.add(Backend.OPENCL)
return sb
[docs]
@debug
def create_topology_descriptors(self):
if self.enable_opencl_host_buffer_mapping:
# enforce opencl topology on host operator
for field, topo_descriptor in self.input_fields.items():
topo_descriptor = TopologyDescriptor.build_descriptor(
backend=Backend.OPENCL,
operator=self,
field=field,
handle=topo_descriptor,
cl_env=self.cl_env,
)
self.input_fields[field] = topo_descriptor
for field, topo_descriptor in self.output_fields.items():
topo_descriptor = TopologyDescriptor.build_descriptor(
backend=Backend.OPENCL,
operator=self,
field=field,
handle=topo_descriptor,
cl_env=self.cl_env,
)
self.output_fields[field] = topo_descriptor
else:
super().create_topology_descriptors()
def __new__(
cls,
cl_env=None,
mpi_params=None,
enable_opencl_host_buffer_mapping=False,
**kwds,
):
return super().__new__(cls, mpi_params=mpi_params, **kwds)
def __init__(
self,
cl_env=None,
mpi_params=None,
enable_opencl_host_buffer_mapping=False,
**kwds,
):
if enable_opencl_host_buffer_mapping:
msg = "OpenClMappable is an interface dedicated to extend HostOperator."
assert isinstance(self, HostOperator), msg
if cl_env is not None:
if mpi_params is None:
mpi_params = cl_env.mpi_params
else:
assert mpi_params == cl_env.mpi_params
super().__init__(mpi_params=mpi_params, **kwds)
self.__cl_env = cl_env
self.__enable_opencl_host_buffer_mapping = enable_opencl_host_buffer_mapping
self.__mapped = False
self.__registered_objects = {}
self.__registered_getters = {}
self.__mapped_objects = {}
def __del__(self):
self.unmap_objects(force=True)
@property
def cl_env(self):
return self.__cl_env
@property
def enable_opencl_host_buffer_mapping(self):
return self.__enable_opencl_host_buffer_mapping
[docs]
def setup(self, **kwds):
super().setup(**kwds)
self._register_fields()
def _register_fields(self):
from hysop.fields.discrete_field import (
DiscreteScalarField,
DiscreteScalarFieldView,
)
ivfields = set(
filter(
lambda f: f.backend.kind == Backend.OPENCL,
self.input_discrete_fields.values(),
)
)
ovfields = set(
filter(
lambda f: f.backend.kind == Backend.OPENCL,
self.output_discrete_fields.values(),
)
)
check_instance(ivfields, set, values=DiscreteScalarFieldView)
check_instance(ovfields, set, values=DiscreteScalarFieldView)
vfields = ivfields.union(ovfields)
if vfields:
assert self.cl_env is not None, "No opencl environment has been given."
from hysop.backend.device.opencl.opencl_env import OpenClEnvironment
check_instance(self.cl_env, OpenClEnvironment)
from hysop.backend.device.opencl import cl
ifields = {f.dfield for f in ivfields}
ofields = {f.dfield for f in ovfields}
check_instance(ifields, set, values=DiscreteScalarField)
check_instance(ofields, set, values=DiscreteScalarField)
fields = ifields.union(ofields)
for field in fields:
flags = 0
if field in ifields:
flags |= cl.map_flags.READ
if field in ofields:
flags |= cl.map_flags.WRITE
assert field._data is not None
self.register_mappable_object(
key=field, obj=field._data.handle, flags=flags
)
for vfield in vfields:
self.register_data_getter(
get_key=vfield,
obj_key=vfield.dfield,
getter=vfield._compute_data_view,
)
[docs]
def register_mappable_object(self, key, obj, flags):
from hysop.backend.device.opencl import clArray
msg = 'Device memory object "{}" has already been registered.'
msg = msg.format(key)
assert key not in self.__registered_objects, msg
check_instance(obj, clArray.Array)
self.__registered_objects[key] = (obj, flags)
[docs]
def register_data_getter(self, get_key, obj_key, getter):
assert callable(getter)
msg = 'Device memory getter "{}" has already been registered as an object.'
msg = msg.format(get_key)
assert get_key not in self.__registered_objects, msg
msg = 'Device memory getter "{}" has already been registered as a getter.'
msg = msg.format(get_key)
assert get_key not in self.__registered_getters, msg
msg = 'Device memory object "{}" has not been registered.'
msg = msg.format(obj_key)
assert obj_key in self.__registered_objects, msg
self.__registered_getters[get_key] = (obj_key, getter)
[docs]
def map_objects(self, queue, is_blocking):
DEBUG = False
msg = "Device memory objects have already been mapped to host."
assert not self.__mapped, msg
evt = None
for obj_key, (dev_buf, flags) in self.__registered_objects.items():
if DEBUG:
msg = f"Mapping {obj_key.full_tag}..."
print(msg)
if is_blocking:
host_buf = dev_buf.map_to_host(
queue=queue, is_blocking=is_blocking, flags=flags
)
else:
host_buf, evt = dev_buf.map_to_host(
queue=queue, is_blocking=is_blocking, flags=flags
)
self.__mapped_objects[obj_key] = host_buf
for get_key, (obj_key, getter) in self.__registered_getters.items():
if DEBUG:
msg = f"Applying getter {get_key.full_tag} to mapped buffer {obj_key.full_tag}..."
print(msg)
self.__mapped_objects[get_key] = getter(self.__mapped_objects[obj_key])
self.__mapped = True
return evt
[docs]
def unmap_objects(self, force=False):
msg = "Device memory objects have already been unmapped from host."
assert force or self.__mapped, msg
self.__mapped_objects.clear()
self.__mapped = False
[docs]
def get_mapped_object(self, key):
msg = "Device memory objects have not been mapped to host yet."
assert self.__mapped, msg
msg = 'Device memory object "{}" has not been mapped.'
msg = msg.format(key)
assert key in self.__mapped_objects, msg
return self.__mapped_objects[key]
[docs]
def build_object_getter(self, key):
import functools
msg = 'Device memory object "{}" has not been registered.'
msg = msg.format(key)
assert key in self.__registered_objects, msg
return functools.partial(self.get_mapped_object, key=key)
[docs]
@contextmanager
def map_objects_to_host(self, queue=None, is_blocking=True):
if self.__registered_objects:
assert self.cl_env is not None
queue = first_not_None(queue, self.cl_env.default_queue)
try:
evt = self.map_objects(queue, is_blocking)
yield OpenClMappedMemoryObjectGetter(self, evt)
except:
raise
finally:
self.unmap_objects()
else:
try:
yield
except:
raise